Skip to content

Conversation

@tomncooper
Copy link
Contributor

@tomncooper tomncooper commented Nov 21, 2024

What is the purpose of the change

The operator currently allows the following syntax for defining flink version specific defaults:

kubernetes.operator.default-configuration.flink-version.v1_18.key: value

The problem with this is that, in many cases, these defaults should be applied to newer Flink versions as well, forcing config duplications.

This PR introduces a new "greater than" syntax for config defaults, indicating that they should be applied to a given version and above:
kubernetes.operator.default-configuration.flink-version.v1_18+.key: value

In this case key:value would be applied to all Flink version greater or equal to 1.18, unless overridden for specific versions.

Brief change log

  • Adds a new method getRelevantVersionPrefixes, to the org.apache.flink.kubernetes.operator.config.FlinkConfigManager, which identifies all Flink version default config prefixes which are relevant to the currently specified Flink version. These are then saved in cache (map), keyed on Flink version.
  • Refactored getDefaultConfig to call the cached flink version prefixes or generated them if none exist.
  • Clear the prefix cache when updateDefaultConfig is called. This ensures new prefixes will be calculated on the next call to getDefaultConfig.
  • Refactored the FlinkVersion enum to specify major and minor semver integers to facilitate quick look up of relevant Flink versions when parsing version strings.

Verifying this change

This change added tests and can be verified as follows:

  • Added additional tests in the FlinkConfigManagerTest class to cover the new Regex and getRelevantVersionPrefixes methods.
  • Updated the testVersionNamespaceDefaultConfs test in FlinkConfigManagerTest to test the greater than version behaviour.
  • Added a FlinkVersionTest to ensure refactoring of the enum was correct.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changes to the CustomResourceDescriptors: no
  • Core observer or reconciler logic that is regularly executed: yes

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? Update the configuration docs

@tomncooper
Copy link
Contributor Author

I probably need to look at adding an end to end test for this but want to make sure I am on the right track before doing that.

@tomncooper
Copy link
Contributor Author

CC @gyfora

@tomncooper tomncooper changed the title [FLINK-36529] Allow flink version configs to be set to greater than given version [FLINK-36529] Allow Flink version configs to be set to greater than given version Nov 21, 2024
@gyfora
Copy link
Contributor

gyfora commented Nov 21, 2024

Looks great @tomncooper thanks for picking this up, I think this functionality is fairly well unit testable so I would not bother with an e2e (that would just add to our build time)

@gyfora
Copy link
Contributor

gyfora commented Nov 21, 2024

I will try to review this in detail tomorrow but scrolling through it, it look pretty good already

@tomncooper tomncooper force-pushed the FLINK-36529-greater-than-configs branch from ce3d191 to 28f9c67 Compare November 22, 2024 11:31
Comment on lines +239 to +262
for (Map.Entry<String, String> entry : baseConfMap.entrySet()) {
Matcher versionMatcher = FLINK_VERSION_PATTERN.matcher(entry.getKey());
if (versionMatcher.matches() && versionMatcher.group("gt") != null) {
try {
FlinkVersion keyFlinkVersion =
FlinkVersion.fromMajorMinor(
Integer.parseInt(versionMatcher.group("major")),
Integer.parseInt(versionMatcher.group("minor")));
if (flinkVersion.isEqualOrNewer(keyFlinkVersion)) {
greaterThanVersionPrefixes.put(
keyFlinkVersion,
VERSION_CONF_PREFIX
+ keyFlinkVersion
+ KubernetesOperatorConfigOptions
.FLINK_VERSION_GREATER_THAN_SUFFIX
+ ".");
}
} catch (NumberFormatException numberFormatException) {
LOG.warn("Unable to parse version number in config key: {}", entry.getKey());
} catch (IllegalArgumentException illegalArgumentException) {
LOG.warn("Unknown Flink version in config key: {}", entry.getKey());
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't we generate the relevant version prefix list once for each supported Flink version? That way we wouldn't need to do this again and again.

Copy link
Contributor Author

@tomncooper tomncooper Nov 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean, for every supported version create all possible prefixes and then loop through them for the given FlinkVersion? That would simplify things, but we would have wasted calls to applyDefault which scans the whole base config each time.

Copy link
Contributor Author

@tomncooper tomncooper Nov 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current implementation scans baseConfig once and only creates prefixes for version that are there. Of course I could refactor applyDefault to accept a list of prefixes and check each config against them. Which may be less checks overall.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could also do this once, when the base config changes in the config manager

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I added a cache (map) in the config manager (keyed on flink version) and call that cache inside getDefaultConfig. I clear the cache in updateDefaultConfig so we only do the calc once for each FlinkVersion:baseConfig combination.

@tomncooper tomncooper force-pushed the FLINK-36529-greater-than-configs branch from 28f9c67 to 2e05ee6 Compare November 22, 2024 15:59
@tomncooper tomncooper requested a review from gyfora November 25, 2024 17:32
@tomncooper tomncooper force-pushed the FLINK-36529-greater-than-configs branch from 7261b9d to 3f62577 Compare November 25, 2024 18:24
@tomncooper
Copy link
Contributor Author

@gyfora Looks like the CI was cancelled rather than failed?

@gyfora
Copy link
Contributor

gyfora commented Nov 27, 2024

There is an error in the CI

Please generate the java doc via 'mvn clean install -DskipTests -Pgenerate-docs' again

@tomncooper tomncooper force-pushed the FLINK-36529-greater-than-configs branch from 3f62577 to 738821c Compare November 27, 2024 18:33
@gyfora gyfora merged commit 9bab028 into apache:main Nov 28, 2024
104 checks passed
@tomncooper tomncooper deleted the FLINK-36529-greater-than-configs branch November 28, 2024 14:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants